Skip to content

xds: External Processor Server Interceptor#12889

Open
kannanjgithub wants to merge 457 commits into
grpc:masterfrom
kannanjgithub:ext-proc-server
Open

xds: External Processor Server Interceptor#12889
kannanjgithub wants to merge 457 commits into
grpc:masterfrom
kannanjgithub:ext-proc-server

Conversation

@kannanjgithub

Copy link
Copy Markdown
Contributor

Implements ext_proc server interceptor as per gRFC A93.

kannanjgithub and others added 30 commits April 29, 2026 09:41
- Added rejection of CONTINUE_AND_REPLACE status in HeadersResponse for
  both request and response headers, treating it as a stream failure.
- Fixed potential hangs by ensuring proceedWithClose() is called upon
  stream failure, especially in fail-open scenarios.
- Added explicit sidecar notification via requestStream.onError() upon
  detecting protocol errors to ensure robust stream termination.
- Added new unit test categories 17 in ExternalProcessorFilterTest
  to verify status enforcement.
Updated ExternalProcessorFilter to include the `protocol_config` field in
 the very first `ProcessingRequest` sent to the sidecar (RequestHeaders).
The configuration includes the `request_body_mode` and `response_body_mode`
derived from the filter's processing mode, as required by gRFC A93.
Added a unit test in Category 4 to verify that `protocol_config` is correctly
populated on the first message and omitted from all subsequent messages on
the stream.
nit: Renumbered out of order test categories.
…ment rather than a granular merge of its fields.
ExtProcInterceptor with inner classes ExtProcClientCall and ExtProcListener.
…_of_stream_without_message to indicate the end of the data plane stream to ext-proc.
…ad create composite isReady and onReady behaviors for the calling client application to utilize them for applying flow control.
This commit introduces configuration objects for the external authorization (ExtAuthz) filter and the gRPC service it uses. These classes provide a structured, immutable representation of the configuration defined in the xDS protobuf messages.

The main new classes are:
- `ExtAuthzConfig`: Represents the configuration for the `ExtAuthz` filter, including settings for the gRPC service, header mutation rules, and other filter behaviors.
- `GrpcServiceConfig`: Represents the configuration for a gRPC service, including the target URI, credentials, and other settings.
- `HeaderMutationRulesConfig`: Represents the configuration for header mutation rules.

This commit also includes parsers to create these configuration objects from the corresponding protobuf messages, as well as unit tests for the new classes.
…cService proto and the GrpcServiceXdsContextProvider. Also added GrpcServiceXdsContextProvider in the newInstance method for Filter in Filter.Provider.
Allow unit test to pass CacheChannelManager for in-process channel.

Fix mock ext-proc service to handle all phases of the request-response events to avoid test hanging.

The test failed with "too many messages" error due to sending an empty body message to the data plane server during the "half-close" phase . For a unary RPC, this was interpreted as a second request message, which is invalid. I've updated handleRequestBodyResponse and onExternalBody to only call super.sendMessage() or super.onMessage() if the body content is non-empty. This prevents the redundant empty message from being sent to the data plane while still allowing the external processor to signal the end of the stream.

The stream between the filter and the external processor was never being closed on the client side, causing the InProcessChannel and InProcessServer to hang during shutdown while waiting for the active RPC to terminate.
To fix this, I have updated ExternalProcessorFilter.java to ensure the control plane stream is gracefully closed when the data plane RPC completes or is cancelled.
Changes made:
1. Closing on Completion: In ExtProcClientCall.onNext, once the ResponseTrailers handshake is finished and the application has been notified via proceedWithClose(), I now call extProcClientCallRequestObserver.onCompleted().
2. Handling Cancellation: I overridden the cancel() method in ExtProcClientCall. If the data plane RPC is cancelled by the application, the filter now also cancels the external processor stream with an error, ensuring all resources are freed.
3. Observability Mode Fix: In observability mode, since we don't wait for a ResponseTrailers message from the server, I added logic to ExtProcListener.onClose() to close the external processor stream immediately after sending the final trailers.
These changes ensure proper lifecycle management of the side-channel RPC.
…Shares backpressure logic with observability mode.
…d client features

Aligns the server-side ExternalProcessorServerInterceptor with the client-side
implementation to close specification-compliance gaps. This ensures consistent
header validation, encoding, and filtering behavior across both client and server
processing paths.

Key Changes:
* ExternalProcessorServerInterceptor:
  - Added support for serializing and deserializing 'raw_value' header mutations.
  - Implemented base64 encoding/decoding for binary headers (ending in '-bin').
  - Enforced ASCII character verification for non-binary header values (rejecting
    control/invalid characters).
  - Enforced maximum header length limits and validation.
  - Integrated HeaderForwardingRulesConfig to dynamically restrict headers
    forwarded to the external processor.
  - Added comments documenting HeaderMutationFilter's behavior for invalid values.

* Ported Core Shared Utilities (from client-side ext-proc branch):
  - Aligned and updated shared components: HeaderValue, HeaderValueOption,
    HeaderValueValidationUtils, HeaderMutator, and CachedChannelManager.

* ExternalProcessorServerInterceptorTest:
  - Ported 9 new unit tests to cover binary/ASCII validation, length rules,
    and base64 serialization.
  - Corrected 'failsCall' unit test assertions to verify Status.Code.INTERNAL
    and description on the wire (since Status causes do not serialize).
  - Swapped test interceptor registration order to ensure custom headers
    are appended before the server interceptor serializes them.
By changing rawCall.close(status, trailers) to the internal helper proceedWithClose(status, trailers):

No More Double Close: We immediately transition dataPlaneCallClosed to true. When unblockAfterStreamComplete() subsequently invokes proceedWithClose(), it recognizes the call is already closed and returns immediately. The underlying transport rawCall.close() is now called exactly once, complying perfectly with the gRPC API lifecycle.
Proper Metrics Recording: By routing through proceedWithClose, the server trailers duration metric (serverTrailersDuration) is now correctly captured for these aborted calls, resolving a telemetry gap.
Robust Request Guarding: Setting dataPlaneCallClosed to true before draining the saved listener callbacks ensures that any simultaneous or subsequent attempts by the unblocked server application to call sendMessage or sendHeaders are safely ignored by our interceptor.
# Conflicts:
#	xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java
#	xds/src/main/java/io/grpc/xds/FilterRegistry.java
#	xds/src/main/java/io/grpc/xds/XdsNameResolver.java
#	xds/src/main/java/io/grpc/xds/internal/grpcservice/HeaderValue.java
#	xds/src/main/java/io/grpc/xds/internal/grpcservice/HeaderValueValidationUtils.java
#	xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java
#	xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
#	xds/src/test/java/io/grpc/xds/internal/grpcservice/HeaderValueValidationUtilsTest.java
#	xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutationFilterTest.java
#	xds/src/test/java/io/grpc/xds/internal/headermutations/HeaderMutatorTest.java
…rpc.xds.internal.extproc, with public access modifier:

1. Otel metric instruments out of the client interceptor
2. Moved `KnownLengthInputStream` used during observability mode to send message to the data plane out of the client interceptor.

Created a new class io.grpc.xds.ExternalProcessorUtil with the below. This class is kept here and not in the above sub-package because `toHeaderMap` operates on `HeaderMutationRulesConfig` which is package-private.
1. Method `outboundStreamFromByteString` used to construct the ProcessingRequest payload to Util as a static utility method.
2. Method `toHeaderMap` to Util.
…rpc.xds.internal.extproc, with public access modifier:

1. Otel metric instruments out of the client interceptor
2. Moved `KnownLengthInputStream` used during observability mode to send message to the data plane out of the client interceptor.

Created a new class io.grpc.xds.ExternalProcessorUtil with the below. This class is kept here and not in the above sub-package because `toHeaderMap` operates on `HeaderMutationRulesConfig` which is package-private.
1. Method `outboundStreamFromByteString` used to construct the ProcessingRequest payload to Util as a static utility method.
2. Method `toHeaderMap` and a few more to Util.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.